///*
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements.  See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership.  The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License.  You may obtain a copy of the License at
// *
// *     http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//package com.venn.source.kafka;
//
//import org.apache.flink.annotation.Internal;
//import org.apache.flink.api.common.functions.FlatMapFunction;
//import org.apache.flink.api.common.serialization.SerializationSchema;
//import org.apache.flink.api.common.typeinfo.TypeInformation;
//import org.apache.flink.api.java.tuple.Tuple2;
//import org.apache.flink.streaming.api.datastream.DataStream;
//import org.apache.flink.streaming.api.datastream.DataStreamSink;
//import org.apache.flink.streaming.api.functions.sink.SinkFunction;
//import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
//import org.apache.flink.table.api.TableSchema;
//import org.apache.flink.table.api.ValidationException;
//import org.apache.flink.table.sinks.AppendStreamTableSink;
//import org.apache.flink.table.sinks.UpsertStreamTableSink;
//import org.apache.flink.table.utils.TableConnectorUtils;
//import org.apache.flink.table.utils.TableSchemaUtils;
//import org.apache.flink.types.Row;
//import org.apache.flink.util.Collector;
//import org.apache.flink.util.Preconditions;
//
//import java.util.Arrays;
//import java.util.Objects;
//import java.util.Optional;
//import java.util.Properties;
//
///**
// * A version-agnostic Kafka {@link AppendStreamTableSink}.
// * <p>
// * <p>The version-specific Kafka consumers need to extend this class and
// * override {@link #createKafkaProducer(String, Properties, SerializationSchema, Optional)}}.
// */
//@Internal
//public abstract class KafkaUpsertTableSinkBase implements UpsertStreamTableSink<Row> {
//
//    /**
//     * The schema of the table.
//     */
//    private final TableSchema schema;
//
//    /**
//     * The Kafka topic to write to.
//     */
//    protected final String topic;
//
//    /**
//     * Properties for the Kafka producer.
//     */
//    protected final Properties properties;
//
//    /**
//     * Serialization schema for encoding records to Kafka.
//     */
//    protected final SerializationSchema<Row> serializationSchema;
//
//    /**
//     * Partitioner to select Kafka partition for each item.
//     */
//    protected final Optional<FlinkKafkaPartitioner<Row>> partitioner;
//
//    protected KafkaUpsertTableSinkBase(
//            TableSchema schema,
//            String topic,
//            Properties properties,
//            Optional<FlinkKafkaPartitioner<Row>> partitioner,
//            SerializationSchema<Row> serializationSchema) {
//        this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);
//        this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
//        this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
//        this.partitioner = Preconditions.checkNotNull(partitioner, "Partitioner must not be null.");
//        this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "Serialization schema must not be null.");
//    }
//
//    /**
//     * Returns the version-specific Kafka producer.
//     *
//     * @param topic               Kafka topic to produce to.
//     * @param properties          Properties for the Kafka producer.
//     * @param serializationSchema Serialization schema to use to create Kafka records.
//     * @param partitioner         Partitioner to select Kafka partition.
//     * @return The version-specific Kafka producer
//     */
//    protected abstract SinkFunction<Row> createKafkaProducer(
//            String topic,
//            Properties properties,
//            SerializationSchema<Row> serializationSchema,
//            Optional<FlinkKafkaPartitioner<Row>> partitioner);
//
//    @Override
//    public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
//
//        final SinkFunction<Row> kafkaProducer = createKafkaProducer(
//                topic,
//                properties,
//                serializationSchema,
//                partitioner);
//        // update by venn
//        return dataStream
//                .flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Row>() {
//                    @Override
//                    public void flatMap(Tuple2<Boolean, Row> element, Collector<Row> out) throws Exception {
//                        // true is upsert, false is delete
//                        if (element.f0) {
//                            out.collect(element.f1);
//                        } else {
//                            System.out.println("KafkaUpsertTableSinkBase : retract stream f0 will be false");
//                        }
//                    }
//                })
//                .addSink(kafkaProducer)
//                .setParallelism(dataStream.getParallelism())
//                .name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
//    }
//
//
//    @Override
//    public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
//
//        consumeDataStream(dataStream);
//    }
//
//
//    @Override
//    public String[] getFieldNames() {
//        return schema.getFieldNames();
//    }
//
//    @Override
//    public TypeInformation<?>[] getFieldTypes() {
//        return schema.getFieldTypes();
//    }
//
//    @Override
//    public KafkaUpsertTableSinkBase configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
//        if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {
//            throw new ValidationException("Reconfiguration with different fields is not allowed. " +
//                    "Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +
//                    "But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));
//        }
//        return this;
//    }
//
//    @Override
//    public boolean equals(Object o) {
//        if (this == o) {
//            return true;
//        }
//        if (o == null || getClass() != o.getClass()) {
//            return false;
//        }
//        final KafkaUpsertTableSinkBase that = (KafkaUpsertTableSinkBase) o;
//        return Objects.equals(schema, that.schema) &&
//                Objects.equals(topic, that.topic) &&
//                Objects.equals(properties, that.properties) &&
//                Objects.equals(serializationSchema, that.serializationSchema) &&
//                Objects.equals(partitioner, that.partitioner);
//    }
//
//    @Override
//    public int hashCode() {
//        return Objects.hash(
//                schema,
//                topic,
//                properties,
//                serializationSchema,
//                partitioner);
//    }
//}
